fix(agents-acp): persist user event message before forwarding to ACP#333
fix(agents-acp): persist user event message before forwarding to ACP#333declan-scale wants to merge 1 commit into
Conversation
When a brand-new task receives its first event/send, the agent runtime was the only thing materializing the user's TaskMessage — which races against any messages the agent emits from its task-create handler (e.g. a startup "creating sandbox..." data row). BSON Date is millisecond- precision, so the two writes can land in the same millisecond with no defined order, and even ordered writes can place the agent's startup row first if event_send takes round-trip time to reach the agent. The user-visible symptom is the agent appearing to speak before the user. Write the user-authored event payload as a TaskMessage on the platform side before forwarding to ACP, with created_at clamped to max(now, latest_existing_created_at + 1ms). The 1ms bump matches the staggering used by append_messages so the stored ordering is durable under BSON's millisecond precision. Only user-authored event payloads are persisted on this path; agent- and system-authored event payloads still flow through as events without producing a platform-side message row, so the agent runtime remains the owner of agent message writes.
|
hmm isnt this by design tho? we want to be able to modify the users messages? or nah is that like a configurable thing that wasn't worth it? |
| latest = await self.task_message_service.get_messages( | ||
| task_id=task_id, | ||
| limit=1, | ||
| page_number=1, | ||
| order_by="created_at", | ||
| order_direction="desc", | ||
| ) | ||
| now = datetime.now(UTC) | ||
| if latest and latest[0].created_at is not None: | ||
| candidate = latest[0].created_at + timedelta(milliseconds=1) | ||
| clamped_created_at = max(now, candidate) | ||
| else: | ||
| clamped_created_at = now | ||
|
|
||
| return await self.task_message_service.append_message( |
There was a problem hiding this comment.
TOCTOU window in timestamp clamp
There is a read-then-write gap between get_messages (which fetches the latest created_at) and append_message (which writes with the clamped timestamp). If the agent emits another startup message between those two awaits, it will land with a wall-clock timestamp that is ≥ latest[0].created_at + 1ms, and the platform-written user message may end up with the same or an earlier millisecond than that new agent message — reproducing the original ordering ambiguity within the narrow window. The PR description already defers a complete fix (per-task monotonic sequence numbers) as a follow-up, so this is a known limitation, but it's worth making explicit so future work can close the gap.
Prompt To Fix With AI
This is a comment left during a code review.
Path: agentex/src/domain/use_cases/agents_acp_use_case.py
Line: 885-899
Comment:
**TOCTOU window in timestamp clamp**
There is a read-then-write gap between `get_messages` (which fetches the latest `created_at`) and `append_message` (which writes with the clamped timestamp). If the agent emits another startup message between those two awaits, it will land with a wall-clock timestamp that is ≥ `latest[0].created_at + 1ms`, and the platform-written user message may end up with the same or an earlier millisecond than that new agent message — reproducing the original ordering ambiguity within the narrow window. The PR description already defers a complete fix (per-task monotonic sequence numbers) as a follow-up, so this is a known limitation, but it's worth making explicit so future work can close the gap.
How can I resolve this? If you propose a fix, please make it concise.
Problem
In the custom-agents UI, the first turn of a brand-new task sometimes
renders an agent-authored message above the user's first message. The
expected order is user-input first, then everything the agent does in
response to it.
For agentic / async agents, the user's first input arrives as an
event/sendRPC. Today the agent runtime is the one that materializesthat input into a
TaskMessageafter it processes the forwarded event.That write races against any messages the agent emits from its
task-create handler (e.g. a startup "creating sandbox..." data row):
Dateis millisecond-precision, so two writes in the samemillisecond have no defined order.
row can be timestamped earlier than the user row because the event has
to round-trip from the platform to the agent before the agent persists
the user message.
The UI sorts by
(created_at, _id)and renders strictly in that order,so the agent visibly speaks before the user.
Fix
In
_handle_event_send, when the event payload is user-authored,persist it as a
TaskMessageon the platform side before forwardingthe event to ACP, with
created_atclamped to:The 1ms bump matches the staggering used by
append_messagesso thestored ordering is durable against BSON's millisecond precision, and
the synchronous platform-side write closes the round-trip window — the
user row is in the DB before the agent ever sees the event.
Only user-authored payloads are persisted on this path. Agent- /
system-authored event payloads still flow through
create_event_and_forward_to_acpwithout producing a platform-side message row, so the agent runtime
remains the owner of agent message writes.
Tests
Added three unit tests under
TestAgentsACPUseCaseexercising the newbehavior against real Postgres + MongoDB testcontainers:
test_handle_event_send_persists_user_message_with_clamped_created_at:pre-seeds an agent-authored "sandbox create" row stamped at now,
then sends a user-authored event at the same instant, and asserts the
user row is timestamped strictly after the agent row by at least 1ms.
test_handle_event_send_persists_user_message_when_task_is_empty:no pre-existing messages — the user message is stamped with
wall-clock, no clamp bump needed, and is the only message present.
test_handle_event_send_skips_persistence_for_agent_authored_event:an
author=AGENTevent payload is forwarded normally but does NOTproduce a platform-side
TaskMessage.Out of scope / follow-ups
(schema change + backfill).
Last-Event-ID+ removing the cache-skip onreconnect (separate ticket — fixes a sibling reconciliation bug).
in
use-task-messages/use-task-subscription.streaming_statustransitions to aterminal state on worker death / cancel.
Validation
CI to enforce.
Test plan for reviewer
a startup data row and confirm the user message is always rendered
first.
grouping behavior is unchanged for non-first-turn cases.
message do not produce duplicate user rows (default
AsyncBaseACPtemplate does not; custom handlers should be audited).
Greptile Summary
Fixes a race condition in the custom-agents UI where an agent's startup message could appear before the user's first message by persisting the user-authored event payload as a
TaskMessageon the platform side — with amax(now, latest + 1ms)clamped timestamp — before forwarding the event to ACP._handle_event_sendnow calls a new_append_user_event_messagehelper forauthor=USERpayloads only; agent/system payloads are unchanged and still flow throughcreate_event_and_forward_to_acpwithout producing a platform-side row.Confidence Score: 4/5
Safe to merge with awareness of the partial-failure scenario introduced on the new persistence path.
The core ordering fix is sound and well-tested. However, the new MongoDB write in
_handle_event_sendis not atomic with the subsequent Postgres event creation and ACP forward — if either of those steps fails, the user message row is durably written with no event sent to the agent, and a client retry will write a second user message. This is a real regression in the error path that was clean before this change.The
_handle_event_sendmethod inagents_acp_use_case.pywarrants a second look around error handling for the new MongoDB write step.Important Files Changed
_handle_event_sendbefore ACP forwarding; introduces a partial-failure window where the MongoDB write succeeds but the subsequent ACP forward can fail, leaving an orphaned user message that duplicates on retry.Sequence Diagram
%%{init: {'theme': 'neutral'}}%% sequenceDiagram participant Client participant Platform as _handle_event_send participant MongoDB participant Postgres participant ACP as Agent (ACP) Client->>Platform: "event/send (author=USER)" Platform->>MongoDB: _append_user_event_message (created_at clamped) MongoDB-->>Platform: TaskMessage persisted Platform->>Postgres: create_event_and_forward_to_acp Postgres-->>Platform: EventEntity created Platform->>ACP: send_event ACP-->>Platform: ack Platform-->>Client: EventEntity note over MongoDB,Postgres: If ACP forward fails, MongoDB row is orphaned and a client retry creates a duplicate user message%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%% sequenceDiagram participant Client participant Platform as _handle_event_send participant MongoDB participant Postgres participant ACP as Agent (ACP) Client->>Platform: "event/send (author=USER)" Platform->>MongoDB: _append_user_event_message (created_at clamped) MongoDB-->>Platform: TaskMessage persisted Platform->>Postgres: create_event_and_forward_to_acp Postgres-->>Platform: EventEntity created Platform->>ACP: send_event ACP-->>Platform: ack Platform-->>Client: EventEntity note over MongoDB,Postgres: If ACP forward fails, MongoDB row is orphaned and a client retry creates a duplicate user messageReviews (2): Last reviewed commit: "fix(agents-acp): persist user event mess..." | Re-trigger Greptile